package com.izhengyin.demo.concurrent.basic.collect;

import com.izhengyin.demo.concurrent.SleepUtils;

import java.util.Date;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 * @author zhengyin
 * Created on 2021/11/30
 */
public class ArrayBlockingQueueTest {
    private final static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(100);

    public static void main(String[] args){
        Executors.newFixedThreadPool(1).execute(() -> {
            consumer();
        });

        IntStream.rangeClosed(1,5)
            .forEach(i -> {
                for(int n=0;n<10;n++){
                    try {
                        //队列满后将阻塞
                        blockingQueue.put(n);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                SleepUtils.sleep(1000);
            });
    }

    public static void consumer(){
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        IntStream.range(0,5)
            .forEach(x -> {
                executorService.execute(() -> {
                    while (true){
                        Integer i = null;
                        try {
                            //队列无数据将阻塞
                            i = blockingQueue.take();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName()+" , "+new Date()+" => pull "+i);
                    }
                });
            });
    }
}
